package com.MutiModule.common.utils.hash;

import java.util.ArrayList;
import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;

import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

/**
 * 一致性Hash 简单测试代码
 * @author alexgaoyh
 * 	基本场景：
 * 		如果有N个cache，如何将一个对象映射到N个cache上。映射行为可能会采用 Hash(object)%N 的方式；
 * 		但是如果某个cache挂掉了，映射行为会采用Hash(object)%(N-1) 的方式； 新增cache，映射行为采用 Hash(object)%(N+1) 的方式；
 * 		这种情况下，会造成所有的cache 都失效，所以，采用其他方式解决cache失效的问题；
 * 
 *  一致性哈希算法的基本实现原理是将机器节点和key值都按照一样的hash算法映射到一个0~2^32的圆环上。
 *		当有一个写入缓存的请求到来时，计算Key值k对应的哈希值Hash(k)，如果该值正好对应之前某个机器节点的Hash值，则直接写入该机器节点，
 *		如果没有对应的机器节点，则顺时针查找下一个节点，进行写入，如果超过2^32还没找到对应节点，则从0开始查找(因为是环状结构)；
 *
 *		经过一致性哈希算法散列之后，当有新的机器加入时，将只影响一台机器的存储情况，
 *			例如新加入的节点H的散列在B与C之间，则原先由C处理的一些数据可能将移至H处理，而其他所有节点的处理情况都将保持不变，因此表现出很好的单调性。
 *			而如果删除一台机器，例如删除C节点，此时原来由C处理的数据将移至D节点，而其它节点的处理情况仍然不变。
 *			而由于在机器节点散列和缓冲内容散列时都采用了同一种散列算法，因此也很好得降低了分散性和负载。而通过引入虚拟节点的方式，也大大提高了平衡性。
 *				具体机器映射时，还可以根据处理能力不同，将一个实体节点映射到多个虚拟节点
 * @param <T>
 */
public class ConsistentHash<T> {

    private final HashFunction          hashFunction;                      // 所用的hash函数
    private final int                   numberOfReplicas;                  // server虚拟节点倍数(100左右比较合理)
    private final SortedMap<Integer, T> circle = new TreeMap<Integer, T>(); // server节点分布圆

    /**
     * 初始化一致性hash算法
     * @param hashFunction 
     * @param numberOfReplicas
     * @param nodes server节点集合
     */
    public ConsistentHash(HashFunction hashFunction, int numberOfReplicas, Collection<T> nodes){
        this.hashFunction = hashFunction;
        this.numberOfReplicas = numberOfReplicas;

        for (T node : nodes) {
            add(node);
        }
    }

    /**
     * 加入server节点
     * @param node 
     */
    public void add(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.put(hashFunction.hashString(node.toString() + i, Charsets.UTF_8).asInt(), node);
        }
    }

    /**
     * 移除server节点
     * @param node
     */
    public void remove(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.remove(hashFunction.hashString(node.toString() + i, Charsets.UTF_8).asInt());
        }
    }

    /**
     * 获取client对应server节点
     * @param key
     * @return
     */
    public T get(Object key) {
        if (circle.isEmpty()) {
            return null;
        }
        
        //生成client对应的hash值
        int hash = hashFunction.hashString(key.toString(), Charsets.UTF_8).asInt();
        
        //如果没有对应此hash的server节点，获取大于等于此hash后面的server节点；如果还没有，则获取server节点分布圆的第一个节点
        if (!circle.containsKey(hash)) {
            SortedMap<Integer, T> tailMap = circle.tailMap(hash);
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
        }
        return circle.get(hash);
    }

    /********************** 测试代码**********start ************************/

    public static void main(String[] args) {

        ArrayList<String> nodeList = new ArrayList<String>();
        nodeList.add("redis1");
        nodeList.add("redis2");
        nodeList.add("redis3");
        nodeList.add("redis4");
        
        HashFunction hf = Hashing.md5();
        
        ConsistentHash<String> consistentHash = new ConsistentHash<String>(hf, 100, nodeList);
        
        String[] userIds = 
            {"-84942321036308",
            "-76029520310209",
            "-68343931116147",
            "-54921760962352"
            };
        for (String userId : userIds) {
        	//根据一致性hash算法获取客户端对应的服务器节点
        	System.out.println(consistentHash.get(userId));
        }
        
        System.out.println("-----------------------新增节点");
        
        //新增节点
        nodeList.add("redis5");
        consistentHash = new ConsistentHash<String>(hf, 100, nodeList);
        for (String userId : userIds) {
        	//根据一致性hash算法获取客户端对应的服务器节点
        	System.out.println(consistentHash.get(userId));
        }
        
        
        System.out.println("-----------------------删除节点");
        nodeList.remove(0);
        consistentHash = new ConsistentHash<String>(hf, 100, nodeList);
        for (String userId : userIds) {
        	//根据一致性hash算法获取客户端对应的服务器节点
        	System.out.println(consistentHash.get(userId));
        }
        
    }

    /********************** 测试代码******************end ****************/

}